package com.stay4it.rxjava;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.stay4it.rxjava.bean.Student;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;

/**
 * RxJava过滤操作符
 * 参考文章：http://blog.csdn.net/jdsjlzx/article/details/51489061
 */
public class RxJava05 {
	static List<Student> studentList = new ArrayList<Student>(){
		{
			add(new Student("Stay", 28));
			add(new Student("谷歌小弟", 23));
			add(new Student("Star", 25));
		}
	};
	
	/**
	 * Filter
	 * Filter只会返回满足过滤条件的数据
	 */
	private static void test1(){
		Observable.from(studentList)
		.filter(new Func1<Student, Boolean>() {

			@Override
			public Boolean call(Student t) {
				return t.age>25;
			}
		})
		.subscribe(new Action1<Student>() {

			@Override
			public void call(Student student) {
				System.out.println("student = " + student);			
			}
		});
		
	}
	
	/**
	 * Debounce系列操作符
	 * Debounce系列操作符会过滤掉发射速率过快的数据项，相当于限流，但是需要注意的是debounce过滤掉的数据会被丢弃掉。 
	 * 如果在一个指定的时间间隔过去了仍旧没有发射一个，那么它将发射最后的那个。 
	 * 
	 * RxJava将这个操作符实现为throttleWithTimeout和debounce.
	 * 
	 * 简单粗暴的说法：当N个结点发生的时间太靠近（即发生的时间差小于设定的值T），debounce就会自动过滤掉前N-1个结点。 
	 * 场景：比如EidtText输入联想，可以使用debounce减少频繁的网络请求。避免每输入（删除）一个字就做一次联想。 
	 * 和switchMap结合使用效果更佳，一个用于取消上次请求，一个用于节流。
	 * 
	 * 注意：下面方法换做throttleWithTimeout操作符输出结果一致。
	 */
	private static void test2(){
		Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                
            	for (int i=0;i<10;i++) { 
                    subscriber.onNext(i); 
                    int time=100; 
                    if (i%3==0) { 
                        time=400; 
                    } 
                    try { 
                         Thread.sleep(time); 
                    } catch (InterruptedException e) { 
                         e.printStackTrace(); 
                    } 
                } 
                subscriber.onCompleted(); 
            }
        })
        .debounce(300, TimeUnit.MILLISECONDS)  //超时时间为300毫秒，如果这里设置的超时时间大于400ms或者小于100ms，则会出现不同的结果
		.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer value) {
				System.out.println("value = " + value);				
			}
		});
		
	}
	
	/**
	 * Debounce操作符
	 * 不仅可以使用时间来进行过滤，还可以根据一个函数来进行限流。
	 * 如果源Observable在发射一个新的数据的时候，上一个数据根据函数所生成的临时Observable还没有结束，那么上一个数据就会被过滤掉。
	 * 
	 * 注意：不管怎么过滤条件，Observable中的最后一项数据总是会输出；
	 */
	private static void test3(){
		Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
		.debounce(new Func1<Integer, Observable<Integer>>() {

			@Override
			public Observable<Integer> call(Integer t) {
				return Observable.create(new OnSubscribe<Integer>() {

					@Override
					public void call(Subscriber<? super Integer> subscriber) {
						//如果%2==0，则发射数据并调用了onCompleted结束，则不会被丢弃
	                    if (t % 2 == 0) {
	                        subscriber.onNext(t);
	                        subscriber.onCompleted();
	                    }
					}
				});
				
			}
		})
		.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer value) {
				System.out.println("value = " + value);				
			}
		});
	}
	
	/**
	 * ThrottleWithTimeout操作符
	 * 如果在设定好的时间结束前源Observable有新的数据发射出来，这个数据就会被丢弃，同时重新开始计时。 
	 * 运行结果具有不确定性
	 * 表现在：
	 * 1.休眠时间与设定好的时间相等时，结果具有不确定性；
	 * 2.休眠时间小于设定好的时间时，结果具有确定性，即输出最后一项数据（如果在设定好的时间结束前源Observable有新的数据发射出来，这个数据就会被丢弃，同时重新开始计时）；
	 * 3.休眠时间大于设定好的时间时，输出所有数据（原因是在设定好的时间结束前源Observable没有新的数据发射出来，ThrottleWithTimeout对计时作用失效）；
	 * 4.不管怎么设定时间，Observable中的最后一项数据总是会输出；
	 * 
	 * 注意：ThrottleWithTimeout则只有跟使用时间参数来限流的Debounce一样的功能
	 */
	private static void test4(){
		
		Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
		.flatMap(new Func1<Integer, Observable<Integer>>() {

			@Override
			public Observable<Integer> call(Integer t) {
				int sleep = 100;
                if (t % 3 == 0) {
                    sleep = 500;
                }
                try {
                	System.out.println("沉睡时间：" + sleep + "  t = " + t);
                    Thread.sleep(sleep);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

				return Observable.just(t);
			}
		})
		.throttleWithTimeout(50, TimeUnit.MILLISECONDS)
		.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer value) {
				System.out.println("value = " + value);				
			}
		});
		
	}
	
	/**
	 * Distinct操作符
	 * 功能就是去重
	 */
	private static void test5(){
		Observable.just(1, 2, 4, 1, 3, 5)
		.distinct()
		.subscribe(new Action1<Integer>() {
			
			@Override
			public void call(Integer value) {
				System.out.println("value = " + value);				
			}
		});
		
	}
	
	/**
	 * DistinctUntilChanged
	 * 去重，与Distinct的区别：不是完全过滤，只是连续N个相同的数据，仅仅保留一个，后面的他就不管了
	 * 
	 */
	private static void test6(){
		Observable.just(1, 2, 3, 3, 3, 1, 2, 3, 3)
		.distinctUntilChanged()
		.subscribe(new Action1<Integer>() {
			
			@Override
			public void call(Integer value) {
				System.out.println("value = " + value);				
			}
		});
	}
	
	/**
	 * OfType
	 * 解释：类似于filter但是又不同，他是按照数据类型进行过滤
	 * 
	 */
	private static void test7(){
		Observable.just("a", 2, 3.0).ofType(String.class)
		.subscribe(new Action1<String>() { 
			  @Override public void call(String value) { 
				  System.out.println("ofType value = " + value);	
			  }
			});
		
	}
	
	/**
	 * Single
	 * 解释：对源Observable发射出的数据进行判断
	 * 如果返回的过滤结果数量不是1，他就抛异常java.lang.IllegalArgumentException: Sequence contains too many elements
	 * 
	 */
	private static void test8(){
		Observable.just(1, 2, 4, 1, 3, 5,6).single(new Func1<Integer, Boolean>() { 
			 @Override public Boolean call(Integer integer) { 
			     //取大于4的唯一一个元素，如果没有满足条件的元素则抛出异常 
			     return integer>=5; 
			 }
			})
		.subscribe(new Action1<Integer>() { 
			@Override public void call(Integer value) { 
				System.out.println("value = " + value);	
			}
		});
		
	}
	
	
	public static void main(String[] args) throws InterruptedException {
		test7();
		
	}
	
	
}
